Skip to content

[History Server][Feat] Add metadata collect logic#4431

Open
popojk wants to merge 25 commits intoray-project:masterfrom
popojk:add_metadata_write_logic_in_history_server
Open

[History Server][Feat] Add metadata collect logic#4431
popojk wants to merge 25 commits intoray-project:masterfrom
popojk:add_metadata_write_logic_in_history_server

Conversation

@popojk
Copy link

@popojk popojk commented Jan 23, 2026

Why are these changes needed?

We need to implement metadata collect logic in order to implement dead cluster behavior for bellow history server API endpoints:

/api/data/datasets/{job_id}
/api/serve/applications/
/api/v0/placement_groups/

Related issue number

#4384
#4385
#4386

Tests

The new added features are more about web api calls, which is hard to do unit test. So we do bellow manual tests:

  1. Run ray cluster by executing kubectl apply -f historyserver/config/raycluster.yaml, which will run our collector as sidecar container

2.Execute kubectl port-forward svc/raycluster-historyserver-head-svc 8265:8265 in order to access our ray cluster

3.Execute bellow command to add a placement group to ray cluster

ray job submit --address http://localhost:8265 -- python -c "                                                                                                                                  
import ray
ray.init(address='auto')
pg = ray.util.placement_group(name='my_pg', strategy='PACK', bundles=[{'CPU': 0.2}])
ray.get(pg.ready())
print(f'Created: {pg.id.hex()}')
"

4.Check minio, the placement group data is collected and stored
截圖 2026-01-26 下午2 58 03

5.Execute curl -sS http://localhost:8265/api/v0/placement_groups to get data from ray cluster directly, we get the same data
截圖 2026-01-26 下午2 59 22

  1. Execute kubectl apply -f historyserver/config/rayjob.yaml to test if datasets data will be collected

  2. Datasets data were collected as expected

截圖 2026-01-27 晚上8 58 52
  1. Execute bellow command to serve an example app:
ray job submit --address http://localhost:8265 -- python -c "
from ray import serve
from ray.serve import Application

@serve.deployment
class MyDeployment:
  pass

app: Application = MyDeployment.bind()
serve.run(app)
"
  1. The application showed up in minio as expected
截圖 2026-01-30 下午6 50 30

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
@400Ping
Copy link
Contributor

400Ping commented Jan 26, 2026

I think overall looks good

popojk and others added 3 commits January 27, 2026 20:55
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Copy link
Contributor

@fweilun fweilun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Overall, looks great.

Comment on lines 21 to 32
var metaCommonUrlInfo = []*types.UrlInfo{
&types.UrlInfo{Key: utils.OssMetaFile_Applications,
Url: "http://localhost:8265/api/serve/applications/",
Type: "URL",
},

&types.UrlInfo{
Key: utils.OssMetaFile_PlacementGroups,
Url: "http://localhost:8265/api/v0/placement_groups",
Type: "URL",
},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is &types.UrlInfo removable?
The slice type is already declared as []*types.UrlInfo.

return body, nil
}

func (r *RayLogHandler) PersisDatasetsMeta() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a typo here. (Persist)

Copy link
Contributor

@400Ping 400Ping left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, please resolve conflicts.

Signed-off-by: fweilun <william20020503@gmail.com>
@popojk popojk changed the title [WIP][History Server][Feat] Add metadata collect logic [History Server][Feat] Add metadata collect logic Jan 28, 2026
@popojk popojk marked this pull request as ready for review January 28, 2026 10:34
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
}
currentJobIDs := make(map[string]string, 0)
for _, jobinfo := range jobsData {
job := jobinfo.(map[string]interface{})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe type assertion may cause panic

Medium Severity

The type assertion jobinfo.(map[string]interface{}) uses the bare form without checking if the assertion succeeded. If the API returns unexpected data (e.g., a null element in the jobs array or a different data structure), this will panic. Other code in this codebase uses the ok-form pattern for similar assertions.

Fix in Cursor Fix in Web

popojk and others added 3 commits January 28, 2026 18:39
…ory_server' into add_metadata_write_logic_in_history_server

Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
…ory_server' into add_metadata_write_logic_in_history_server

Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @popojk @400Ping @fweilun can you give me an example to test these 3 endpoints?

currently I only see 1 example.

ray job submit --address http://localhost:8265 -- python -c "                                                                                                                                  
import ray
ray.init(address='auto')
pg = ray.util.placement_group(name='my_pg', strategy='PACK', bundles=[{'CPU': 0.2}])
ray.get(pg.ready())
print(f'Created: {pg.id.hex()}')
"

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I need you to provide an example that can run end-to-end and lets me view, via the Ray Dashboard, information for:

    • Serve applications
    • Dataset endpoints
    • Placement group endpoints
      And please record a video showing it.

    a. If possible, add a new file named rayjob-workaround.yaml and include the example there.
    Try to make it look as similar to rayjob.yaml as possible.

  2. There’s a mistake in the PR description: running kubectl apply -f historyserver/config/historyserver.yaml will not create a RayCluster.

  3. For the collector, please add an environment variable to enable this “persist metadata” function (when enabled, it should try to fetch data from blob storage).

env:
  - name: SUPPORT_RAY_EVENT_UNSUPPORTED_DATA
    value: 1 # or true
  1. The endpoints will hit port 8265. Is there a way to know the Ray Dashboard port ahead of time? Hardcoding it doesn’t feel great.

Signed-off-by: Alex Wu <c.alexwu@gmail.com>
})
} else if !existingJob.StopPersist {
// Update status for existing jobs only if not already stopped persisting
existingJob.Status = status
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition modifying struct fields outside mutex

Medium Severity

After calling jobResourcesUrlInfo.Get(), the returned *types.JobUrlInfo pointer is modified (existingJob.Status and urlInfo.StopPersist) outside of any lock. The Get method releases its read lock before the modifications occur, creating a data race if these fields are accessed concurrently.

Additional Locations (1)

Fix in Cursor Fix in Web

Signed-off-by: Alex Wu <c.alexwu@gmail.com>
@400Ping
Copy link
Contributor

400Ping commented Feb 1, 2026

Here's the yaml I used to test:

# RayJob that demonstrates endpoints used by the "persist metadata" workaround:
# - Placement groups (api/v0/placement_groups)
# - Serve applications (api/serve/applications/)
# - Datasets (api/jobs/ + api/data/datasets/{job_id})
#
# Use this with the History Server setup guide (Step 6). Ensure the collector
# has SUPPORT_RAY_EVENT_UNSUPPORTED_DATA enabled so it persists these to blob storage.
# After the job runs, you can view the data in Ray Dashboard, and after cluster
# teardown, via History Server (from persisted meta).
#
apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: rayjob-workaround
spec:
  entrypoint: |
    python -c "
    import ray
    import time

    ray.init()

    # --- 1. Placement group (visible under api/v0/placement_groups) ---
    pg = ray.util.placement_group(
        name='workaround_pg',
        strategy='PACK',
        bundles=[{'CPU': 0.2}]
    )
    ray.get(pg.ready())
    print('Placement group created:', pg.id.hex())

    # --- 2. Serve application (visible under api/serve/applications/) ---
    from ray import serve

    @serve.deployment(num_replicas=1)
    def hello(request):
        return {'message': 'hello from workaround'}

    serve.run(hello.bind(), name='workaround_app')
    print('Serve application deployed: workaround_app')

    # --- 3. Dataset (creates job + dataset; visible under api/jobs/ and api/data/datasets/) ---
    ds = ray.data.range(10)
    n = ds.count()
    print('Dataset created, count:', n)

    # Keep running so the collector can poll Dashboard API and persist meta (e.g. every 5s).
    # Wait long enough for at least one full persist cycle.
    print('Waiting 15s for collector to persist metadata...')
    time.sleep(15)

    # Align with existing guide: allow events to be sent before shutdown.
    time.sleep(5)
    print('Done.')
    "
  clusterSelector:
    ray.io/cluster: raycluster-historyserver

Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

dashboardAddress := os.Getenv("RAY_DASHBOARD_ADDRESS")
if dashboardAddress == "" {
panic(fmt.Errorf("missing RAY_DASHBOARD_ADDRESS in environment variables"))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker nodes panic when RAY_DASHBOARD_ADDRESS is unset

Medium Severity

The collector unconditionally panics if RAY_DASHBOARD_ADDRESS is not set, regardless of node role. However, the dashboard address is only needed when SupportRayEventUnSupportData is enabled, which only applies to Head nodes. Worker nodes will crash unnecessarily if this environment variable is missing, even though they never use it for metadata collection.

Fix in Cursor Fix in Web

Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
@popojk
Copy link
Author

popojk commented Feb 1, 2026

  1. I need you to provide an example that can run end-to-end and lets me view, via the Ray Dashboard, information for:

    • Serve applications
    • Dataset endpoints
    • Placement group endpoints
      And please record a video showing it.

    a. If possible, add a new file named rayjob-workaround.yaml and include the example there.
    Try to make it look as similar to rayjob.yaml as possible.

  2. There’s a mistake in the PR description: running kubectl apply -f historyserver/config/historyserver.yaml will not create a RayCluster.

  3. For the collector, please add an environment variable to enable this “persist metadata” function (when enabled, it should try to fetch data from blob storage).

env:
  - name: SUPPORT_RAY_EVENT_UNSUPPORTED_DATA
    value: 1 # or true
  1. The endpoints will hit port 8265. Is there a way to know the Ray Dashboard port ahead of time? Hardcoding it doesn’t feel great.

@Future-Outlier Changes made:

  1. Add rayjob_workaround.yaml to apply placement group, application, and datasets.
  2. Fixed PR description
  3. Add SUPPORT_RAY_EVENT_UNSUPPORTED_DATA ENVVAR to ray cluster to enable metadata collect
  4. Add RAY_DASHBOARD_ADDRESS ENVVAR to ray cluster to set dashboard address for collector

Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Signed-off-by: Alex Wu <c.alexwu@gmail.com>
Copilot AI mentioned this pull request Feb 5, 2026
4 tasks
Signed-off-by: Jie-Kai Chang <jiekaichang@apache.org>
Signed-off-by: 400Ping <jiekaichang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants